home *** CD-ROM | disk | FTP | other *** search
/ PC Open 101 / PC Open 101 CD 1.bin / CD1 / INTERNET / EMAIL / pop file / setup.exe / POPFile / MQ.pm < prev    next >
Encoding:
Perl POD Document  |  2004-09-21  |  11.4 KB  |  399 lines

  1. # POPFILE LOADABLE MODULE
  2. package POPFile::MQ;
  3.  
  4. use POPFile::Module;
  5. @ISA = ( "POPFile::Module" );
  6.  
  7. #----------------------------------------------------------------------------
  8. #
  9. # This module handles POPFile's message queue.  Every POPFile::Module is
  10. # able to register with the MQ for specific message types and can also
  11. # send messages without having to know which modules need to receive
  12. # its messages.
  13. #
  14. # Message delivery is asynchronous and guaranteed, as well as guaranteed 
  15. # first in, first out (FIFO) per process.
  16. #
  17. # The following public functions are defined:
  18. #
  19. # register() - register for a specific message type and pass an object
  20. #              reference.  will call that object's deliver() method to
  21. #              deliver messages
  22. #
  23. # post()     - send a message of a specific type
  24. #
  25. # The current list of types is
  26. #
  27. #     UIREG    Register a UI component, message is the component type
  28. #              and the element and reference to the
  29. #              object registering (comes from any component)
  30. #
  31. #     TICKD    Occurs when an hour has passed since the last TICKD (this
  32. #              is generated by the POPFile::Logger module)
  33. #
  34. #     LOGIN    Occurs when a proxy logs into a remote server, the message
  35. #              is the username sent
  36. #
  37. #     COMIT    Sent when an item is committed to the history through a call
  38. #              to POPFile::History::commit_slot
  39. #
  40. #    RELSE    Sent when a session key is being released by a client
  41. #
  42. # Copyright (c) 2001-2004 John Graham-Cumming
  43. #
  44. #   This file is part of POPFile
  45. #
  46. #   POPFile is free software; you can redistribute it and/or modify
  47. #   it under the terms of the GNU General Public License as published by
  48. #   the Free Software Foundation; either version 2 of the License, or
  49. #   (at your option) any later version.
  50. #
  51. #   POPFile is distributed in the hope that it will be useful,
  52. #   but WITHOUT ANY WARRANTY; without even the implied warranty of
  53. #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  54. #   GNU General Public License for more details.
  55. #
  56. #   You should have received a copy of the GNU General Public License
  57. #   along with POPFile; if not, write to the Free Software
  58. #   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  59. #
  60. #----------------------------------------------------------------------------
  61.  
  62. use strict;
  63. use warnings;
  64. use locale;
  65.  
  66. use POSIX ":sys_wait_h";
  67.  
  68. #----------------------------------------------------------------------------
  69. # new
  70. #
  71. #   Class new() function
  72. #----------------------------------------------------------------------------
  73. sub new
  74. {
  75.     my $type = shift;
  76.     my $self = POPFile::Module->new();
  77.  
  78.     # These are the individual queues of message, indexed by type
  79.     # and written to by post().
  80.  
  81.     $self->{queue__} = {};
  82.  
  83.     # These are the registered objects for each type
  84.  
  85.     $self->{waiters__} = {};
  86.  
  87.     # List of file handles to read from active children, this
  88.     # maps the PID for each child to its associated pipe handle
  89.  
  90.     $self->{children__}        = {};
  91.  
  92.     # Record the parent process ID so that we can tell when post is
  93.     # called whether we are in a child process or not
  94.  
  95.     $self->{pid__}             = $$;
  96.  
  97.     bless $self, $type;
  98.  
  99.     $self->name( 'mq' );
  100.  
  101.     return $self;
  102. }
  103.  
  104. #----------------------------------------------------------------------------
  105. #
  106. # service
  107. #
  108. # Called to handle pending tasks for the module.  Here we flush all queues
  109. #
  110. #----------------------------------------------------------------------------
  111. sub service
  112. {
  113.     my ( $self ) = @_;
  114.  
  115.     # See if any of the children have passed up messages through their
  116.     # pipes and deal with it now
  117.  
  118.     for my $kid (keys %{$self->{children__}}) {
  119.         $self->flush_child_data_( $self->{children__}{$kid} );
  120.     }
  121.  
  122.     # Iterate through all the messages in all the queues
  123.  
  124.     for my $type (sort keys %{$self->{queue__}}) {
  125.          while ( my $ref = shift @{$self->{queue__}{$type}} ) {
  126.              my @message = @$ref;
  127.              my $flat = join(':', @message);
  128.  
  129.              $self->log_( 2, "Message $type ($flat) ready for delivery" );
  130.  
  131.              for my $waiter (@{$self->{waiters__}{$type}}) {
  132.                 $self->log_( 2, "Delivering message $type ($flat) to " .
  133.                     $waiter->name() );
  134.  
  135.                 $waiter->deliver( $type, @message );
  136.             }
  137.         }
  138.     }
  139.  
  140.     return 1;
  141. }
  142.  
  143. #----------------------------------------------------------------------------
  144. #
  145. # stop
  146. #
  147. # Called when POPFile is closing down, this is the last method that
  148. # will get called before the object is destroyed.  There is not return
  149. # value from stop().
  150. #
  151. #----------------------------------------------------------------------------
  152. sub stop
  153. {
  154.     my ( $self ) = @_;
  155.  
  156.     # Call service() so that any remaining items are flushed and delivered
  157.  
  158.     $self->service();
  159.  
  160.     for my $kid (keys %{$self->{children__}}) {
  161.         close $self->{children__}{$kid};
  162.         delete $self->{children__}{$kid};
  163.     }
  164. }
  165.  
  166. #----------------------------------------------------------------------------
  167. #
  168. # yield_
  169. #
  170. # Called by a child process to allow the parent to do work, this only
  171. # does anything in the case where we didn't fork for the child process
  172. #
  173. #----------------------------------------------------------------------------
  174. sub yield_
  175. {
  176.     my ( $self, $pipe, $pid ) = @_;
  177.  
  178.     if ( $pid != 0 ) {
  179.         $self->flush_child_data_( $pipe )
  180.     }
  181. }
  182.  
  183. #----------------------------------------------------------------------------
  184. #
  185. # forked
  186. #
  187. # This is called when some module forks POPFile and is within the
  188. # context of the child process so that this module can close any
  189. # duplicated file handles that are not needed.
  190. #
  191. # $writer            The writing end of a pipe that can be used to send up from
  192. #                    the child
  193. #
  194. # There is no return value from this method
  195. #
  196. #----------------------------------------------------------------------------
  197. sub forked
  198. {
  199.     my ( $self, $writer ) = @_;
  200.  
  201.     $self->{writer__} = $writer;
  202.  
  203.     for my $kid (keys %{$self->{children__}}) {
  204.         close $self->{children__}{$kid};
  205.         delete $self->{children__}{$kid};
  206.     }
  207. }
  208.  
  209. #----------------------------------------------------------------------------
  210. #
  211. # postfork
  212. #
  213. # This is called when some module has just forked POPFile.  It is
  214. # called in the parent process.
  215. #
  216. # $pid              The process ID of the new child process
  217. # $reader      The reading end of a pipe that can be used to read messages
  218. # from the child
  219. #
  220. # There is no return value from this method
  221. #
  222. #----------------------------------------------------------------------------
  223. sub postfork
  224. {
  225.     my ( $self, $pid, $reader ) = @_;
  226.  
  227.     $self->{children__}{"$pid"} = $reader;
  228. }
  229.  
  230. #----------------------------------------------------------------------------
  231. #
  232. # reaper
  233. #
  234. # Called when a child process terminates somewhere in POPFile.  The
  235. # object should check to see if it was one of its children and do any
  236. # necessary processing by calling waitpid() on any child handles it
  237. # has
  238. #
  239. # There is no return value from this method
  240. #
  241. #----------------------------------------------------------------------------
  242. sub reaper
  243. {
  244.     my ( $self ) = @_;
  245.  
  246.     # Look for children that have completed and then flush the data
  247.     # from their associated pipe and see if any of our children have
  248.     # data ready to read from their pipes,
  249.  
  250.     my @kids = keys %{$self->{children__}};
  251.  
  252.     if ( $#kids >= 0 ) {
  253.         for my $kid (@kids) {
  254.             if ( waitpid( $kid, &WNOHANG ) == $kid ) {
  255.                 $self->flush_child_data_( $self->{children__}{$kid} );
  256.                 close $self->{children__}{$kid};
  257.                 delete $self->{children__}{$kid};
  258.  
  259.                 $self->log_( 0, "Done with $kid (" . scalar(keys %{$self->{children__}}) . " to go)" );
  260.             }
  261.         }
  262.     }
  263. }
  264.  
  265. #----------------------------------------------------------------------------
  266. #
  267. # read_pipe_
  268. #
  269. # reads a single message from a pipe in a cross-platform way.
  270. # returns undef if the pipe has no message
  271. #
  272. # $handle   The handle of the pipe to read
  273. #
  274. #----------------------------------------------------------------------------
  275. sub read_pipe_
  276. {
  277.     my ( $self, $handle ) = @_;
  278.  
  279.     if ( $^O eq "MSWin32" ) {
  280.  
  281.         # bypasses bug in -s $pipe under ActivePerl
  282.  
  283.         my $message;         # PROFILE PLATFORM START MSWin32
  284.  
  285.         if ( &{ $self->{pipeready_} }($handle) ) {
  286.  
  287.             # add data to the pipe cache whenever the pipe is ready
  288.  
  289.             sysread($handle, my $string, -s $handle);
  290.  
  291.             # push messages onto the end of our cache
  292.  
  293.             $self->{pipe_cache__} .= $string;
  294.         }
  295.  
  296.         # pop the oldest message;
  297.  
  298.         $message = $1 if (defined($self->{pipe_cache__}) &&
  299.                           ( $self->{pipe_cache__} =~ s/(.*?\n)// ) );
  300.  
  301.         return $message;        # PROFILE PLATFORM STOP
  302.     } else {
  303.  
  304.         # do things normally
  305.  
  306.         if ( &{ $self->{pipeready_} }($handle) ) {
  307.             return <$handle>;
  308.         }
  309.     }
  310.  
  311.     return undef;
  312. }
  313.  
  314. #----------------------------------------------------------------------------
  315. #
  316. # flush_child_data_
  317. #
  318. # Called to flush data from the pipe of each child as we go, I did
  319. # this because there appears to be a problem on Windows where the pipe
  320. # gets a lot of read data in it and then causes the child not to be
  321. # terminated even though we are done.  Also this is nice because we
  322. # deal with the messages on the fly
  323. #
  324. # $handle   The handle of the child's pipe
  325. #
  326. #----------------------------------------------------------------------------
  327.  
  328. sub flush_child_data_
  329. {
  330.     my ( $self, $handle ) = @_;
  331.  
  332.     my $stats_changed = 0;
  333.     my $message;
  334.  
  335.     while ( ($message = $self->read_pipe_( $handle )) && defined($message) )
  336.     {
  337.         if ( $message =~ /([^:]+):([^\r\n]*)/ ) {
  338.             my @parameters = split( ':', $2 || '' );
  339.             $self->post( $1, @parameters );
  340.         }
  341.     }
  342. }
  343.  
  344. #----------------------------------------------------------------------------
  345. #
  346. # register
  347. #
  348. #   When a module wants to receive specific message types it calls this
  349. #   method with the type of message is wants to receive and the address
  350. #   of a callback function that will receive the messages
  351. #
  352. #   $type        A string identifying the message type
  353. #   $callback    Reference to a function that takes three parameters
  354. #
  355. #----------------------------------------------------------------------------
  356. sub register
  357. {
  358.     my ( $self, $type, $callback ) = @_;
  359.  
  360.     push @{$self->{waiters__}{$type}}, ( $callback );
  361. }
  362.  
  363. #----------------------------------------------------------------------------
  364. #
  365. # post
  366. #
  367. #   Called to send a message through the message queue
  368. #
  369. #   $type        A string identifying the message type
  370. #   @message     The message (list of parameters)
  371. #
  372. #----------------------------------------------------------------------------
  373. sub post
  374. {
  375.     my ( $self, $type, @message ) = @_;
  376.  
  377.     my $flat = join( ':', @message );
  378.     $self->log_( 2, "post $type ($flat)" );
  379.  
  380.     # If we are in the parent process then just stick this on the queue,
  381.     # otherwise write it up the pipe.
  382.  
  383.     if ( $$ == $self->{pid__} ) {
  384.         if ( exists( $self->{waiters__}{$type} ) ) {
  385.             $self->log_( 2, "queuing post $type ($flat)" );
  386.             push @{$self->{queue__}{$type}}, \@message;
  387.             $self->log_( 2, "$type queue length now " . $#{$self->{queue__}{$type}} );
  388.         } else {
  389.             $self->log_( 2, "dropping post $type ($flat)" );
  390.         }
  391.     } else {
  392.         my $pipe = $self->{writer__};
  393.         $self->log_( 2, "sending post $type ($flat) to parent $pipe" );
  394.         print $pipe "$type:$flat\n";
  395.     }
  396. }
  397.  
  398. 1;
  399.